Source code for hysop.operator.base.derivative

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from abc import ABCMeta, abstractmethod
import sympy as sm

from hysop.tools.numpywrappers import npw
from hysop.tools.htypes import check_instance, to_tuple, first_not_None, InstanceOf
from hysop.tools.decorators import debug
from hysop.tools.sympy_utils import exponent, subscript, partial
from hysop.constants import DirectionLabels, SpaceDiscretization
from hysop.parameters.tensor_parameter import TensorParameter
from hysop.symbolic.relational import Assignment

from hysop.core.memory.memory_request import MemoryRequest
from hysop.core.graph.graph import op_apply
from hysop.topology.cartesian_descriptor import CartesianTopologyDescriptors
from hysop.fields.continuous_field import Field, ScalarField
from hysop.operator.base.spectral_operator import SpectralOperatorBase


[docs] class SpaceDerivativeBase(metaclass=ABCMeta): """ Common implementation interface for derivative operators. """ @debug def __new__( cls, F, dF, A=None, derivative=None, direction=None, variables=None, name=None, pretty_name=None, require_tmp=None, **kwds, ): return super().__new__( cls, name=name, pretty_name=pretty_name, input_fields=None, output_fields=None, input_params=None, **kwds, ) @debug def __init__( self, F, dF, A=None, derivative=None, direction=None, variables=None, name=None, pretty_name=None, require_tmp=None, **kwds, ): """ SpaceDerivative operator base. Compute the derivative of a field in a given direction on a given backend, possibly scaled by another field/parameter/value. dF = A * dF/dxj Parameters ---------- F: hysop.field.continuous_field.ScalarField Continuous field as input. dF: hysop.field.continuous_field.ScalarField Continuous field to be written. Some backend may allow inplace differentiation. A: numerical value, ScalarParameter or ScalarField, optional Scaling for convenience, defaults to 1. derivative: int or tuple, optional Which derivative to generate, defaults to (0,)*(dim-1)+(1,). ie. first order derivative in X axis. If integer is given, the derivative is taken in given direction. direction: int, optional Directions in which to take the derivative. Defaults to None. Should be None if derivative is a tuple. require_tmp: bool, optional Should this operator generate a tmp array ? Default to True if F is dF (inplace computation) else False. variables: dict Dictionary of fields as keys and topology descriptors as values. name: str, optional Name of this operator pretty_name: str, optional Pretty name of this operator. kwds: dict, optional Base class keyword arguments. Notes ----- There is two way to build a derivative: (1) derivative(int) + direction(int) gives: => derivative=(0,0,0,0,kd,0,0,0) where the index of kd is direction and kd=derivative (2) derivative(tuple) + direction(None) gives: => derivative=(k0,...,kn) """ assert derivative is not None A = first_not_None(A, 1) check_instance(F, ScalarField) check_instance(dF, ScalarField, allow_none=True) if isinstance(derivative, tuple): assert len(derivative) == F.dim else: direction = first_not_None(direction, 0) _derivative = [ 0, ] * F.dim _derivative[direction] = derivative derivative = tuple(_derivative) check_instance(derivative, tuple, size=F.dim, minval=0) nz_derivatives = tuple(x for x in derivative if (x > 0)) if len(nz_derivatives) == 1: directional_derivative = nz_derivatives[0] _direction = derivative.index(directional_derivative) if direction is not None: assert _direction == direction else: direction = _direction else: assert direction is None directional_derivative = None expr = F.s() for i, xi in enumerate(F.domain.frame.coords): if derivative[i] > 0: expr = expr.diff(xi, derivative[i]) _dF = F.from_sympy_expression( expr=expr, space_symbols=F.domain.frame.coords, is_tmp=True ) default_name = _dF.name default_pretty_name = _dF.pretty_name if dF is None: dF = _dF check_instance(derivative, tuple, size=F.dim, minval=0) check_instance(direction, int, minval=0, maxval=F.dim - 1, allow_none=True) check_instance(directional_derivative, int, minval=0, allow_none=True) name = first_not_None(name, default_name) pretty_name = first_not_None(pretty_name, default_pretty_name) variables = first_not_None(variables, {}) check_instance(name, str) check_instance(pretty_name, str) check_instance(variables, dict, keys=Field, values=CartesianTopologyDescriptors) input_fields = {F: variables.get(F, None)} output_fields = {dF: variables.get(dF, input_fields[F])} input_params = set() is_inplace = dF is F require_tmp = first_not_None(require_tmp, is_inplace) scale_by_field, scale_by_parameter, scale_by_value = (False, False, False) if isinstance(A, ScalarField): input_fields[A] = variables.get(A, input_fields[F]) scale_by_field = True elif isinstance(A, TensorParameter): input_params.update({A}) scale_by_parameter = True elif isinstance(A, (float, int, npw.number, sm.Basic)): scale_by_value = (A != 1) and (A != 1.0) else: msg = f"Unknown type for scaling parameter {type(A)}." raise TypeError(A) super().__init__( name=name, pretty_name=pretty_name, input_fields=input_fields, output_fields=output_fields, input_params=input_params, **kwds, ) self.Fin = F self.Fout = dF self.A = A self.derivative = derivative self.directional_derivative = directional_derivative self.direction = direction self.is_inplace = is_inplace self.require_tmp = require_tmp self.scale_by_field = scale_by_field self.scale_by_parameter = scale_by_parameter self.scale_by_value = scale_by_value
[docs] @debug def discretize(self): if self.discretized: return super().discretize() self.dFin = self.get_input_discrete_field(self.Fin) self.dFout = self.get_output_discrete_field(self.Fout) A = self.A if A in self.input_discrete_fields: self.dA = self.input_discrete_fields[A] else: self.dA = A
[docs] @debug def get_work_properties(self): requests = super().get_work_properties() if self.require_tmp: request = MemoryRequest.empty_like( a=self.dFout, nb_components=1, shape=self.dFout.compute_resolution, backend=self.backend, ) requests.push_mem_request("tmp", request) return requests
[docs] @debug def setup(self, work): super().setup(work) if work is None: raise ValueError("work is None.") if self.require_tmp: (self.dtmp,) = work.get_buffer(self, "tmp")
[docs] class FiniteDifferencesSpaceDerivativeBase(SpaceDerivativeBase): def __new__(cls, **kwds): return super().__new__(cls, **kwds) def __init__(self, **kwds): super().__init__(**kwds) directional_derivative, direction = self.directional_derivative, self.direction msg = "FiniteDifferencesSpaceDerivative only supports directional derivatives." assert isinstance(direction, int), msg assert isinstance(directional_derivative, int), msg
[docs] @classmethod def default_method(cls): dm = super().default_method() dm.update({SpaceDiscretization: 2}) return dm
[docs] @classmethod def available_methods(cls): am = super().available_methods() am.update({SpaceDiscretization: InstanceOf(int)}) return am
[docs] @debug def handle_method(self, method): super().handle_method(method) if not hasattr(self, "space_discretization"): space_discretization = method.pop(SpaceDiscretization) assert 2 <= space_discretization, space_discretization assert space_discretization % 2 == 0, space_discretization self.space_discretization = space_discretization
[docs] class SpectralSpaceDerivativeBase(SpectralOperatorBase, SpaceDerivativeBase): @debug def __new__(cls, testing=False, **kwds): kwds["require_tmp"] = False return super().__new__(cls, **kwds) @debug def __init__(self, testing=False, **kwds): kwds["require_tmp"] = False super().__init__(**kwds) F, dF = self.Fin, self.Fout derivative = self.derivative tg = self.new_transform_group() axes = tuple(i for (i, di) in enumerate(derivative[::-1]) if (di > 0)) if testing and not axes: axes = tuple(range(F.dim)) elif not axes: msg = f"No transform axes found, got derivative={derivative}." raise RuntimeError(msg) Ft = tg.require_forward_transform(F, axes=axes, custom_output_buffer="auto") Bt = tg.require_backward_transform( dF, axes=axes, custom_input_buffer="auto", matching_forward_transform=Ft ) assert Ft.output_dtype == Bt.input_dtype dFt = Ft.s assert len(derivative) == F.dim for i, di in enumerate(derivative): if di > 0: dFt = dFt.diff(Ft.s.all_vars[i], di) expr = Assignment(Bt.s, dFt) kds = tg.push_expressions(expr) self.Ft = Ft self.Bt = Bt self.tg = tg self.kds = kds self.expr = expr
[docs] def discretize(self): super().discretize() dkds, nd_dkds = (), () for kd in self.kds: _, dkd, nd_dkd = self.tg.discrete_wave_numbers[kd] dkds += (dkd,) nd_dkds += (nd_dkd,) self.dkds = dkds self.nd_dkds = nd_dkds